/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package studio.raptor.ddal.jdbc.processor;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;

import studio.raptor.ddal.core.connection.BackendConnection;
import studio.raptor.ddal.core.connection.ContextConnectionWrapper;
import studio.raptor.ddal.core.constants.FlowType;
import studio.raptor.ddal.core.engine.ProcessEngine;
import studio.raptor.ddal.core.engine.Processor;
import studio.raptor.ddal.jdbc.RaptorResultSet;

/**
 * SQL语句处理器。
 * 1、准备执行上下文ProcessContext
 * 2、调用执行流程SqlProcessDefinition
 *
 * @author Sam
 * @since 1.0
 */
public class StatementProcessor extends Processor {

  public StatementProcessor(ProcessEngine engine) {
    super(engine);
  }

  @Override
  public WorkingMode getWorkingMode() {
    return WorkingMode.JDBC;
  }

  /**
   * Execute a SQL statement that returns a single ResultSet
   *
   * @param sql typically a static SQL SELECT statement
   * @return a ResulSet that contains the data produced by the query
   * @throws SQLException if a database access error occurs
   */
  public ResultSet executeQuery(final String sql) throws SQLException {
    getProcessContext().setIsPreparedStatement(false);
    getProcessContext().setOriginSql(sql);
    executeInternal();
    return new RaptorResultSet(getProcessContext().getMergedResult());
  }

  /**
   * Execute a SQL statement that may return multiple results. We don't have
   * to worry about this since we do not support multiple ResultSets. You can
   * use getResultSet or getUpdateCount to retrieve the result.
   *
   * @param sql any SQL statement
   * @return true if the next result is a ResulSet, false if it is an update count or there are no
   * more results
   * @throws SQLException if a database access error occurs
   */
  public boolean execute(String sql) throws SQLException {
    getProcessContext().setIsPreparedStatement(false);
    getProcessContext().setOriginSql(sql);
    executeInternal();
    return true;
  }

  public int executeUpdate(String sql) throws SQLException {
    getProcessContext().setIsPreparedStatement(false);
    getProcessContext().setOriginSql(sql);
    executeInternal();
    return getProcessContext().getMergedResult().getAffectedRows();
  }

  public int executeUpdate(final String sql, final int autoGeneratedKeys) throws SQLException {
    getProcessContext().setIsPreparedStatement(false);
    boolean retrieveGeneratedKeys = getProcessContext().isRetrieveGeneratedKeys();
    getProcessContext().setRetrieveGeneratedKeys(autoGeneratedKeys == Statement.RETURN_GENERATED_KEYS);
    getProcessContext().setOriginSql(sql);
    executeInternal();
    getProcessContext().setRetrieveGeneratedKeys(retrieveGeneratedKeys);
    return getProcessContext().getMergedResult().getAffectedRows();
  }

  public int executeUpdate(final String sql, final int[] columnIndexes) throws SQLException {
    getProcessContext().setIsPreparedStatement(false);
    getProcessContext().setOriginSql(sql);
    executeInternal();
    return getProcessContext().getMergedResult().getAffectedRows();
  }

  public int executeUpdate(final String sql, final String[] columnNames) throws SQLException {
    getProcessContext().setIsPreparedStatement(false);
    getProcessContext().setOriginSql(sql);
    executeInternal();
    return getProcessContext().getMergedResult().getAffectedRows();
  }

  public boolean execute(final String sql, final int autoGeneratedKeys) throws SQLException {
    boolean retrieveGeneratedKeys = getProcessContext().isRetrieveGeneratedKeys();
    getProcessContext().setRetrieveGeneratedKeys(autoGeneratedKeys == Statement.RETURN_GENERATED_KEYS);
    getProcessContext().setOriginSql(sql);
    executeInternal();
    getProcessContext().setRetrieveGeneratedKeys(retrieveGeneratedKeys);
    return true;
  }

  public boolean execute(final String sql, final int[] columnIndexes) throws SQLException {
    getProcessContext().setIsPreparedStatement(false);
    getProcessContext().setOriginSql(sql);
    executeInternal();
    return false;
  }

  public boolean execute(final String sql, final String[] columnNames) throws SQLException {
    getProcessContext().setIsPreparedStatement(false);
    getProcessContext().setOriginSql(sql);
    executeInternal();
    return false;
  }

  /**
   * 集成模式执行SQL语句的总入口。
   * <p>
   * 当SQL执行出现异常时，需归还当前上下文中的物理连接。防止连接被
   * 长时间占用。
   *
   * @throws SQLException SQL exception
   */
  void executeInternal() throws SQLException {
    try {
      getProcessEngine().process(FlowType.DML);
    } catch (Exception ge) {
      // 清除context
      getProcessContext().getSqlParameters().clear();
      // 还后端连接
      Map<String, ContextConnectionWrapper> backendConnectionsWrapper =
          getProcessContext().getShardBackendConnWrapper();
      if (null != backendConnectionsWrapper && !backendConnectionsWrapper.isEmpty()) {
        BackendConnection _bc;
        for (Map.Entry<String, ContextConnectionWrapper> wrapperEntry : backendConnectionsWrapper.entrySet()) {
          if(null != (_bc = wrapperEntry.getValue().getReadWriteConnection())) {
            _bc.close();
          }
          if(null != (_bc = wrapperEntry.getValue().getReadonlyConnection())) {
            _bc.close();
          }
        }
      }
      // SQL执行异常之后，清除RaptorConnection上下文中的后端连接
      // 当前RC再次收到SQL处理请求时会再次从后端连接池获取连接。
      // 避免多次还回连接池的异常发生
      getProcessContext().getShardBackendConnWrapper().clear();
      throw ge;
    }
  }
}
